package com.study.datasource.crawler;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.study.datasource.DatasourceApplication;
import com.study.datasource.bean.CovidBean;
import com.study.datasource.util.HttpUtils;
import com.study.datasource.util.TimeUtils;
import org.jsoup.Jsoup;
import org.jsoup.nodes.Document;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

import java.sql.Struct;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

/**
 * 实现疫情数据爬取
 */
//@RunWith(SpringJUnit4ClassRunner.class)
//@SpringBootTest(classes= DatasourceApplication.class)
    @Component
public class Covid19DataCrawler {
    @Autowired
    private KafkaTemplate kafkaTemplate;

//   springboot和kafka整合测试
//    @Test
//    public void testKafkaTemplate() throws Exception {
//        kafkaTemplate.send("test",1,"abc");
//        Thread.sleep(1000000);
//    }

    // 后续需要将该方法改为定时任务，如每天8点定时爬取疫情数据
    //springboot的定时任务工具
    //@Scheduled(initialDelay = 1000,fixedDelay = 1000)
    //@Scheduled(cron = "0/1 * * * * ?")//每隔一秒执行一次 0 0/1 * * * ?（一分钟）
    //@Scheduled(cron = "0 0 8 * * ?")//每天8点定时执行
    @Scheduled(initialDelay = 1000,fixedDelay = 1000*60*60*24)//首次任务执行的延迟时间和任务之间的间隔时间
//    @Scheduled(cron = "0/5 * * * * ?")//每隔5秒
    public void testCrawling() throws Exception {
        //System.out.println("每个五秒执行一次");
        String datetime = TimeUtils.format(System.currentTimeMillis(), "yyyy-MM-dd");

        final String datatime = TimeUtils.format(System.currentTimeMillis(), "yyyy-MM-dd");

        // 1.爬取指定页面
        final String html = HttpUtils.getHtml("https://ncov.dxy.cn/ncovh5/view/pneumonia");
//        System.out.println(html);

        // 2.解析页面中的指定内容 即id为getAreaStat的标签中的全国疫情数据
        final Document doc = Jsoup.parse(html);
        final String text = doc.select("script[id=getAreaStat]").toString();
        System.out.println(text);

        // 3.使用正则表达式获取JSON格式的疫情数据
        String pattern ="\\[(.*)\\]";//定义正则对象
        final Pattern reg = Pattern.compile(pattern);//编译成正则对象
        final Matcher matcher = reg.matcher(text);//去text中进行匹配
        String  jsonStr="";
        if (matcher.find()){
            jsonStr = matcher.group(0);
            //System.out.println(jsonStr);
        }else {
            System.out.println("no match");
        }

        // 进一步解析JSON
        // 4.将第一层省份数据解析为Javabean
        final List<CovidBean> pCovidBeans = JSON.parseArray(jsonStr, CovidBean.class);
        for(CovidBean pBean : pCovidBeans){// 为省份
            //System.out.println(pBean);
            // 先设置时间字段
            pBean.setDatetime(datetime);
            // 获取cities
            String citysStr = pBean.getCities();
            // 5.将第二层JSON城市数据解析为Javabean
            List<CovidBean> covidBeans = JSON.parseArray(citysStr, CovidBean.class);
            for (CovidBean bean : covidBeans) {
                //System.out.println(bean);
                bean.setDatetime(datetime);
                bean.setPid(pBean.getLocationId());//把省份的id作为城市的pid
                bean.setProvinceShortName(pBean.getProvinceShortName());
                //System.out.println(bean);
                //后续需要将城市疫情数据发送给kafka
                //将Javabean转换为jsonStr再发送给kafka
                String beanStr = JSON.toJSONString(bean);
                //同一个省份的数据到同一个分区,所以获取pid
                kafkaTemplate.send("covid19",bean.getPid(),beanStr);
            }
            // 6.获取第一层（省份数据）每一天的统计数据
            String statisticsDataUrl = pBean.getStatisticsData();
            String statisticsDataStr= HttpUtils.getHtml(statisticsDataUrl);
            // 获取statisticsData字段对应的数据
            JSONObject jsonObject = JSON.parseObject(statisticsDataStr);
            String dataStr = jsonObject.getString("data");
            //System.out.println(dataStr);
            // 将爬取解析出来的每一天的数据设置回省份pBean字段中（之前改字段只是一个URL）
            pBean.setStatisticsData(dataStr);
            pBean.setCities(null);
            //System.out.println(pBean);
            //后续需要将省份疫情数据发送给kafka
            String pBeanStr = JSON.toJSONString(pBean);
            kafkaTemplate.send("covid19",pBean.getLocationId(),pBeanStr);
        }
        //Thread.sleep(1000000);线程阻塞
    }
}
